iT邦幫忙

2024 iThome 鐵人賽

DAY 9
0
自我挑戰組

30 天程式學習筆記:我的自學成長之路系列 第 9

[DAY 9] Python 開發者的福音: Airflow 幫你輕鬆駕馭工作流程

  • 分享至 

  • xImage
  •  

你是否厭倦了手動執行重複性任務?超過 80% 的工程師都面臨著這個問題。Airflow 是一個強大的開源工作流程自動化平台,可以幫助你擺脫繁瑣的手動操作,讓你專注於更重要的任務。

Airflow 用於以程式化的方式設計、安排和監控工作流程。由於 Airflow 是用 Python 開發的,只要熟悉 Python 語言,就能輕鬆上手。此外,作為一個專門處理工作流程的工具,Airflow 能夠與多種類型的工具、軟體和資料庫無縫整合。即使 Airflow 本身不直接支援串接某些特定工具,使用者也可以透過自定義插件來擴展其功能,展現出極高的靈活性。具體可以實現哪些事情,大家就參考官網吧!

Airflow特色

  1. 開源
  2. 用Python開發
  3. 可擴展性強
  4. 好用的UI介面
  5. 強大的集成

Airflow核心概念

  1. 有向無環圖(DAG):
    DAG 是一種具有方向但不會形成循環的圖結構,代表一組有依賴關係的任務,通常用來描述工作流程。DAG 描述了任務的執行順序和依賴關係,但不包含具體的執行邏輯。由於 Airflow 在執行工作流程時,這些流程是線性的、有限的,最終會結束,因此用無環圖來表示是非常適合的。DAG 決定了每個任務節點之間的順序和關係,從而設計出工作流程的運作方式。

  2. 任務(Task):
    Task 是 DAG 中的基本執行單元,每個 Task 都代表一個獨立的操作,可以是運行一個 Python 腳本、執行一個 SQL 查詢、調用 API 等。

  3. 操作器(Operator):
    Operator 是 Task 的模板,定義了 Task 的行為,是更細緻的執行單元,它們被包含在 Task 之中。Airflow 提供了多種內建 Operator ,如 PythonOperatorBashOperatorMySqlOperator 等,使用者也可以自定義 Operator 。每個 Task 可能由多個 Operator 組成,而每個 Operator 負責執行特定的任務。這樣的結構使得任務可以靈活地拆解和執行。以下是幾個常見的 Operator 範例:

    • PythonOperator:可以調用 Python 函數來執行任務。
    • EmailOperator:用於發送 Email 通知。
    • PostgreSQLOperator:負責操作與連接 PostgreSQL 資料庫。
      https://ithelp.ithome.com.tw/upload/images/20240903/20167760cyynlHKfRe.png
  4. 調度器(Scheduler):
    Scheduler 負責根據 DAG 定義的時間間隔調度任務。它會持續監控 DAG,發現需要執行的任務並將其放入執行隊列。

  5. 執行器(Executor):
    Executor 負責執行 Scheduler 分配的任務。Airflow 支援多種執行器,包括 SequentialExecutor(順序執行)、LocalExecutor(並行執行)、CeleryExecutor(分佈式執行)等。

  6. 鉤子(Hook):
    Hook 是對外部系統的介面,如資料庫、API 等。Operator 通常通過 Hook 與外部系統交互。

  7. 連接(Connection):
    Connection 是對外部資源的配置,包含訪問外部資源所需的認證資訊和其他相關配置。

使用方法

  1. DAG 定義:
    DAG 通常用 Python 代碼定義,包含 DAG 的 ID、調度間隔、任務依賴關係等資訊。以下是一個簡單的範例:

    from airflow import DAG
    from airflow.operators.bash import BashOperator
    from datetime import datetime
    
    with DAG(
        'example_dag',
        default_args={
            'start_date': datetime(2023, 1, 1),
            'retries': 1,
        },
        schedule_interval='@daily',
    ) as dag:
    
        task1 = BashOperator(
            task_id='print_date',
            bash_command='date',
        )
    
        task2 = BashOperator(
            task_id='echo_hello',
            bash_command='echo Hello, World!',
        )
    
        task1 >> task2
    
    
  2. 任務調度:
    配置好 DAG 後,Airflow 調度器會自動根據 DAG 定義的調度間隔執行任務。使用者可以在 Web 介面中監控任務的執行情況,並進行手動觸發或暫停任務。

  3. 錯誤處理:
    Airflow 提供了多種錯誤處理機制,包括任務重試、警報通知、任務依賴關係定義等。使用者可以通過配置任務的 retriesretry_delayemail_on_failure 等參數來實現錯誤處理。

  4. 任務依賴管理:
    在定義 DAG 時,任務之間的依賴關係是通過 >><< 操作符指定的。Airflow 還支援基於任務執行狀態的動態依賴管理,例如 TriggerRule

實用技巧

  1. 分離邏輯和配置:
    盡量將任務的執行邏輯與 DAG 定義分離,使 DAG 文件簡潔易讀,且便於維護。例如,可以將複雜的任務邏輯封裝在獨立的 Python 模組中,然後在 DAG 中調用。
  2. 使用模板化功能:
    Airflow 支援 Jinja 模板引擎,可以在任務中動態生成命令或腳本。利用模板化功能可以使 DAG 更加靈活和通用。
  3. 高效管理連接:
    使用 Airflow 提供的連接管理功能,統一配置和管理訪問外部系統的憑證和參數,避免在代碼中硬編碼這些敏感資訊。
  4. 監控和調試:
    使用 Airflow 的 Web 介面和日誌系統監控任務執行狀況,迅速定位和解決問題。這對於維持工作流的穩定運行至關重要。

上一篇
[DAY 8] .gitattributes 進階應用: 掌控 GitHub 儲存庫語言設定
下一篇
[DAY 10] Airflow 實戰演練(1):從網頁爬蟲到資料庫儲存與郵件通知
系列文
30 天程式學習筆記:我的自學成長之路30
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言